Skip to content

[SPARK-55793][CORE] Add multiple log directories support to SHS#54575

Closed
sarutak wants to merge 9 commits intoapache:masterfrom
sarutak:shs-multi-log-dirs
Closed

[SPARK-55793][CORE] Add multiple log directories support to SHS#54575
sarutak wants to merge 9 commits intoapache:masterfrom
sarutak:shs-multi-log-dirs

Conversation

@sarutak
Copy link
Member

@sarutak sarutak commented Mar 2, 2026

What changes were proposed in this pull request?

This PR proposes to add multiple log directories support to SHS, allowing it to monitor event logs from multiple directories simultaneously.

This PR extends spark.history.fs.logDirectory to accept a comma-separated list of directories (e.g., hdfs:///logs/prod,s3a://bucket/logs/staging). Directories can be on the same or different filesystems. Also, a new optional config spark.history.fs.logDirectory.names is added which allows users to assign display names to directories by position (e.g., Production,Staging). Empty entries fall back to the full path. Duplicate display names are rejected at startup.

Behavior of existing spark.history.fs.* settings with multiple directories:

All existing settings apply globally — there are no per-directory configurations.

Setting Behavior
update.interval One scan cycle covers all directories sequentially
cleaner.interval One cleaner cycle operates on the unified listing across all directories
cleaner.maxAge Applied to each log entry regardless of which directory it belongs to
cleaner.maxNum Total count across all directories; oldest entries are removed first regardless of directory
numReplayThreads Thread pool is shared across all directories
numCompactThreads Thread pool is shared across all directories
eventLog.rolling.maxFilesToRetain Applied per-directory independently
update.batchSize Applied per-directory independently

Regarding UI changes, a "Log Source" column is added to the History UI table showing the display name (or full path) for each application, with a tooltip showing the full path.

Regarding UI changes, A "Log Source" column is added to the History UI table showing the display name (or full path) for each application, with a tooltip showing the full path.
all-log-dirs

Users can filter applications by their log directory using Filter by Log Source dropdown.
filter-by-log-dir

The Event log directory section in the History UI collapses into a <details>/<summary> element when multiple directories are configured.
unexpand
expand

Why are the changes needed?

Some organization run multiple clusters and have corresponding log directory for each cluster. So if SHS supports multiple log directories, it can be used as a single end point to view event logs, which helps such organizations.

Does this PR introduce any user-facing change?

Yes but will not affect existing users.

How was this patch tested?

Manually confirmed WebUI as screenshots above and added new tests.

Was this patch authored or co-authored using generative AI tooling?

Kiro CLI / Opus 4.6

Copy link
Member

@dongjoon-hyun dongjoon-hyun left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you, @sarutak .

I understand prod and staging use cases.

What happens when there exists a conflict among the log directories? For example, a user want to abuse this as a kind of multi-tier log managements like the following and copy from shorterm to longterm? Of course, the sync operation is non-atomic.

  • hdfs://spark-events/shorterm
  • hdfs://spark-events/longterm

What is the semantic on the ordering in the config value? Especially, when we have SPARK-52914 ?

@dongjoon-hyun
Copy link
Member

Could you fix the CI failures?

[info] *** 24 TESTS FAILED ***
[error] Failed: Total 4431, Failed 24, Errors 0, Passed 4407, Ignored 28, Canceled 6
[error] Failed tests:
[error] 	org.apache.spark.deploy.history.RocksDBBackendHistoryServerSuite
[error] 	org.apache.spark.deploy.history.LevelDBBackendHistoryServerSuite
[error] (core / Test / test) sbt.TestsFailedException: Tests unsuccessful

@dongjoon-hyun dongjoon-hyun changed the title [SPARK-55793][WEBUI] Add multiple log directories support to SHS [SPARK-55793][CORE] Add multiple log directories support to SHS Mar 2, 2026
@sarutak
Copy link
Member Author

sarutak commented Mar 2, 2026

@dongjoon-hyun Thank you for your interest.

What happens when there exists a conflict among the log directories? For example, a user want to abuse this as a kind of multi-tier log managements like the following and copy from shorterm to longterm? Of course, the sync operation is non-atomic.

hdfs://spark-events/shorterm
hdfs://spark-events/longterm

Each event log file is tracked by its full path as the key in LogInfo. So if the same application's event log exists in both directories, they are treated as separate entries.
I didn't anticipated such kind of usage but during a non-atomic copy, the incomplete log file in the destination directory may fail to parse or show incomplete information temporarily. However, on the next scan cycle, shouldReloadLog invoked through checkForLogs detects the file size change and re-parses it, so the entry self-corrects once the copy completes.

What is the semantic on the ordering in the config value? Especially, when we have SPARK-52914 ?

The ordering of directories in the config value has no semantic. All directories are scanned equally in each polling cycle (checkForLogs iterates over all logDirs). The order does not affect priority.

On-demand loading operates per log file within checkForLogsInDir, which is called independently for each directory. There is no cross-directory interaction, so I believe multiple directories support and on-demand loading are orthogonal and work together without issues.

@dongjoon-hyun
Copy link
Member

Thank you. This is a nice feature. I'll try to test more seriously.

Copy link
Member

@dongjoon-hyun dongjoon-hyun left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We need more clear definition between this and the existing spark.history.fs.* configuration. At the first glance,

  • Do you want to have per-directory configurations in the future?
  • For now, spark.history.fs.update.interval is supposed to be applied for one scan for all directories?
  • spark.history.fs.cleaner.interval is also supposed to be applied for one scan for all directories?
  • When spark.history.fs.cleaner.maxNum is applied,
    • This PR will consider the total number of files for all directories, right?
    • Which directory will be selected as a victim for the tie?

Since this introduces lots of ambiguity a little, could you revise the PR title and provide a corresponding documentation update, docs, together in this PR?

@sarutak
Copy link
Member Author

sarutak commented Mar 4, 2026

@dongjoon-hyun Thank you for your feedback.

Do you want to have per-directory configurations in the future?

I considered it might be helpful to have per-directory configurations (e.g. spark.history.fs.cleaner.*) but this such configurations are not supported at least in this PR, and I'd like to start with simple global settings and improve based on user feedback.

For now, spark.history.fs.update.interval is supposed to be applied for one scan for all directories?

Yes.

spark.history.fs.cleaner.interval is also supposed to be applied for one scan for all directories?

Yes.

When spark.history.fs.cleaner.maxNum is applied,
This PR will consider the total number of files for all directories, right?
Which directory will be selected as a victim for the tie?

Yes, the property is applied to the total number of log entries across all directories. As the updated document says, when the limit is exceeded, the oldest completed attempts are deleted first regardless of which directory they belong to.

Since this introduces lots of ambiguity a little, could you revise the PR title and provide a corresponding documentation update, docs, together in this PR?

Updated (You said revise the PR title but I thought it's type for PR description so I've updated only the description).

Copy link
Member

@dongjoon-hyun dongjoon-hyun left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you for updating.

Copy link
Member

@dongjoon-hyun dongjoon-hyun left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1, I supported @sarutak 's proposal and this PR's approach. Thank you.

cc @mridulm , @yaooqinn , @LuciferYang , too.

@sarutak
Copy link
Member Author

sarutak commented Mar 4, 2026

Thanks @yaooqinn for your feedback. I've updated.

@LuciferYang
Copy link
Contributor

LuciferYang commented Mar 5, 2026

@sarutak I found one potential correctness issue:

In FsHistoryProvider, AttemptInfoWrapper currently stores attempt.logPath as basename (reader.rootPath.getName()), and later operations resolve it via resolveLogPath(attempt.logPath) by scanning configured log dirs and returning the first match.

This may be ambiguous when two configured directories contain the same event log basename (e.g. after migration/copy or multi-cluster aggregation,although the probability is very low.). In that case, download/rebuild/cleanup may operate on a different physical file than the one originally indexed.

For example, the following test case:

test("same log file name across directories resolves incorrectly") {
    val dir2 = Utils.createTempDir(namePrefix = "logDir2")
    try {
      val conf = createTestConf()
        .set(HISTORY_LOG_DIR, s"${testDir.getAbsolutePath},${dir2.getAbsolutePath}")
      val provider = new FsHistoryProvider(conf)

      val collidingLogName = "shared-event-log"
      val log1 = new File(testDir, collidingLogName)
      writeFile(log1, None,
        SparkListenerApplicationStart("app1", Some("app1-id"), 1L, "test", None),
        SparkListenerApplicationEnd(5L))
      val log2 = new File(dir2, collidingLogName)
      writeFile(log2, None,
        SparkListenerApplicationStart("app2", Some("app2-id"), 2L, "test", None),
        SparkListenerApplicationEnd(6L))

      updateAndCheck(provider) { list =>
        list.size should be (2)
        list.map(_.id).toSet should be (Set("app1-id", "app2-id"))
      }

      val attempt = provider.getAttempt("app2-id", None)
      attempt.logSourceFullPath should be (dir2.getAbsolutePath)

      val resolveLogPath = PrivateMethod[(FileSystem, Path)](Symbol("resolveLogPath"))
      val (_, resolvedPath) = provider invokePrivate resolveLogPath(attempt.logPath)
      resolvedPath.toUri.getPath should be (log2.getAbsolutePath)

      provider.stop()
    } finally {
      Utils.deleteRecursively(dir2)
    }
  }

In a scenario where multiple log directories are configured in SHS and there are event log files with the same name in different directories, I expect the log path of app2 to be resolved to the second directory where it actually resides. However, in practice, resolveLogPath matches the file with the same name in the first directory first, resulting in an incorrect path resolution.

So can we assume that filenames are globally unique across configured directories?

@sarutak
Copy link
Member Author

sarutak commented Mar 5, 2026

@LuciferYang Thank you for pointing it out. While event log filenames are typically unique in normal operation, duplicates can occur during migration or multi-cluster log aggregation.
I've fixed resolveLogPath to use the source directory hint (logSourceFullPath) that is already recorded during indexing:

  1. When logSourceFullPath is available, the method first checks that specific directory
  2. If the file is not found there (e.g., file was moved), it falls back to scanning all directories

I also added a test based on what you show.

@sarutak sarutak force-pushed the shs-multi-log-dirs branch from f1b6e0d to 68272d7 Compare March 5, 2026 16:38
try {
checkForLogsInDir(dir, newLastScanTime, allNotStale)
} catch {
case e: Exception =>
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we catch more narrow and specific exception instead of Exception?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Narrowed to IOException.

Copy link
Member

@dongjoon-hyun dongjoon-hyun left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we have some test coverage for the following cases?

  • What happens when the one of directories is removed while SHS is running for a while?
  • What happens when the one of directories is not created at all before SHS starts? And, it's supposed to be created after SHS is running.

For example, I'm thinking about MONTHLY LOG DIRECTORY scenarios.

s3://spark-events/2026/04/
s3://spark-events/2026/05/

It's not a blocker. We can consider the above as a new JIRA issue, @sarutak .

@sarutak
Copy link
Member Author

sarutak commented Mar 5, 2026

@dongjoon-hyun Thank you for your suggestion. I'll open a PR for some more test coverage.

@dongjoon-hyun
Copy link
Member

Thank you! Feel free to merge this PR first so that we can get more feedbacks from the community, @sarutak .

@sarutak sarutak closed this in 325763f Mar 6, 2026
@sarutak
Copy link
Member Author

sarutak commented Mar 6, 2026

Merged to master. Thank you @dongjoon-hyun @yaooqinn @LuciferYang the reviews and feedbacks!

dongjoon-hyun pushed a commit that referenced this pull request Mar 7, 2026
…ories feature

### What changes were proposed in this pull request?
This PR proposes to add more tests for SHS multiple log directories feature added in SPARK-55793 (#54575).
New tests include:

- **directory removed while SHS is running** — verifies that removing a log directory at runtime does not crash the scan and apps from remaining directories are still listed
- **directory does not exist at startup but created later** — verifies that a directory that doesn't exist at startup is picked up on subsequent scans (monthly directory scenario)
- **directory temporarily inaccessible then recovers** — verifies that apps reappear after a temporarily inaccessible directory is restored
- **all directories inaccessible does not crash** — verifies graceful handling when all configured directories become unavailable
- **config with empty entries between commas** — verifies that empty entries in `spark.history.fs.logDirectory` (e.g., `dir1,,dir2`) are handled correctly
- **logDirectory.names count mismatch falls back to full paths** — verifies that when the number of names doesn't match the number of directories, display names fall back to full paths

### Why are the changes needed?
For better test coverage.

### Does this PR introduce _any_ user-facing change?
No.

### How was this patch tested?
Confirmed that all new tests passed.
```
$ build/sbt 'testOnly org.apache.spark.deploy.history.RocksDBBackendFsHistoryProviderSuite'
```

### Was this patch authored or co-authored using generative AI tooling?
Kiro CLI / Opus 4.6

Closes #54660 from sarutak/shs-multi-log-dirs-more-tests.

Authored-by: Kousuke Saruta <sarutak@amazon.co.jp>
Signed-off-by: Dongjoon Hyun <dongjoon@apache.org>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants